/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.engine.meta.crawler

import com.chinamobile.cmss.lakehouse.engine.meta.crawler.model.Codec._
import com.chinamobile.cmss.lakehouse.engine.meta.crawler.model.{SyncTable, Table}
import io.circe.syntax._
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.slf4j.LoggerFactory
import scalaj.http.Http

class HttpSink(crawlerConfig: CrawlerConfig, url: String)
  extends SinkFunction[Table] {

  private val logger = LoggerFactory.getLogger(getClass)

  override def invoke(table: Table, context: SinkFunction.Context): Unit = {
    val source = crawlerConfig.source match {
      case s: OssSource => s
    }
    val syncTable = SyncTable(
      crawlerConfig.jobId,
      crawlerConfig.tableChangeSetting,
      crawlerConfig.target.tablePrefix,
      crawlerConfig.target.instance,
      source.ak,
      source.sk,
      source.endpoint,
      table
    )
    val json = syncTable.asJson
    logger.info("sink table {} start", syncTable)
    logger.info("sink url {}", url)
    val response = Http(url)
      .header("Content-Type", "application/json;charset=UTF-8")
      .header("User-Id", crawlerConfig.user)
      .header("Request-Id", crawlerConfig.jobId)
      .postData(json.toString())
      .asString
    if (response.is2xx) {
      logger.info("sink table {} success", table.tableName)
    } else {
      logger.error(
        s"sink table ${table.tableName} failure due to ${response.body}"
      )
    }
  }

}

object HttpSink {
  def apply(crawlerConfig: CrawlerConfig, url: String): HttpSink =
    new HttpSink(crawlerConfig, url)
}
